package com.atguigu.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.app.func.DimAsyncJoinFunction;
import com.atguigu.bean.TradeSkuOrderBean;
import com.atguigu.utils.DateFormatUtil;
import com.atguigu.utils.KafkaUtil;
import com.atguigu.utils.MyClickHouseUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.math.BigDecimal;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

//数据流：web/app -> Mysql -> Maxwell -> Kafka(ODS) -> FlinkApp -> Kafka(DWD) -> FlinkApp -> ClickHouse(DWS)
//程  序：Mock -> Mysql -> Maxwell -> Kafka(ZK) -> DwdTradeOrderDetailApp -> Kafka(ZK) -> Dws09TradeSkuOrderWindow(Redis,HDFS,ZK,HBase,Phoenix) -> ClickHouse(ZK)
public class Dws09TradeSkuOrderWindow {

    public static void main(String[] args) throws Exception {

        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 需要从Checkpoint或者Savepoint启动程序
        //2.1 开启Checkpoint,每隔5秒钟做一次CK  ,并指定CK的一致性语义
        //env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
        // 2.2 设置超时时间为 1 分钟
        //env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
        // 2.3 设置两次重启的最小时间间隔
        //env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
        // 2.5 指定从 CK 自动重启策略
        //env.setRestartStrategy(RestartStrategies.failureRateRestart(
        //        3, Time.days(1L), Time.minutes(1L)
        //));
        // 2.6 设置状态后端
        //env.setStateBackend(new EmbeddedRocksDBStateBackend(true) );
        //env.getCheckpointConfig().setCheckpointStorage(
        //      "hdfs://hadoop102:8020/flinkCDC"
        //);
        // 2.7 设置访问HDFS的用户名
        //System.setProperty("HADOOP_USER_NAME", "atguigu");

        //TODO 2.读取Kafka 订单明细主题数据创建流
        String topic = "dwd_trade_order_detail";
        String groupId = "order_detail_220718";
        DataStreamSource<String> kafkaDS = env.addSource(KafkaUtil.getFlinkKafkaConsumer(topic, groupId));

        //TODO 3.过滤null值并转换为JSON对象
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.flatMap(new FlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                if (value != null) {
                    out.collect(JSON.parseObject(value));
                }
            }
        });

        //TODO 4.提取时间戳生成WaterMark
        SingleOutputStreamOperator<JSONObject> jsonObjWithWMDS = jsonObjDS.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
            @Override
            public long extractTimestamp(JSONObject element, long recordTimestamp) {
                return DateFormatUtil.toTs(element.getString("create_time"), true);
            }
        }));

        //TODO 5.过滤由left join产生的重复数据  并将数据转换为JavaBean对象
        SingleOutputStreamOperator<TradeSkuOrderBean> tradeSkuOrderDS = jsonObjWithWMDS
                .keyBy(json -> json.getString("id"))
                .flatMap(new RichFlatMapFunction<JSONObject, TradeSkuOrderBean>() {

                    private ValueState<String> valueState;

                    @Override
                    public void open(Configuration parameters) throws Exception {

                        StateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.seconds(10))
                                .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
                                .build();

                        ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("value-state", String.class);
                        stateDescriptor.enableTimeToLive(ttlConfig);

                        valueState = getRuntimeContext().getState(stateDescriptor);
                    }

                    @Override
                    public void flatMap(JSONObject value, Collector<TradeSkuOrderBean> out) throws Exception {

                        //取出状态数据
                        String state = valueState.value();

                        if (state == null) {
                            valueState.update("1");

                            BigDecimal couponAmount = value.getBigDecimal("split_coupon_amount");
                            if (couponAmount == null) {
                                couponAmount = new BigDecimal(0.0);
                            }

                            BigDecimal activityAmount = value.getBigDecimal("split_activity_amount");
                            if (activityAmount == null) {
                                activityAmount = new BigDecimal(0.0);
                            }

                            out.collect(TradeSkuOrderBean
                                    .builder()
                                    .skuId(value.getString("sku_id"))
                                    .orderAmount(value.getBigDecimal("split_total_amount"))
                                    .originalAmount(value.getBigDecimal("split_original_amount"))
                                    .couponAmount(couponAmount)
                                    .activityAmount(activityAmount)
                                    .build()
                            );
                        }
                    }
                });

        //TODO 6.分组开窗聚合
        //tradeSkuOrderDS.windowAll()
        SingleOutputStreamOperator<TradeSkuOrderBean> reduceDS = tradeSkuOrderDS
                .keyBy(TradeSkuOrderBean::getSkuId)
                .window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)))
                .reduce(new ReduceFunction<TradeSkuOrderBean>() {
                    @Override
                    public TradeSkuOrderBean reduce(TradeSkuOrderBean value1, TradeSkuOrderBean value2) throws Exception {
                        value1.setOriginalAmount(value1.getOriginalAmount().add(value2.getOriginalAmount()));
                        value1.setOrderAmount(value1.getOrderAmount().add(value2.getOrderAmount()));
                        value1.setActivityAmount(value1.getActivityAmount().add(value2.getActivityAmount()));
                        value1.setCouponAmount(value1.getCouponAmount().add(value2.getCouponAmount()));
                        return value1;
                    }
                }, new WindowFunction<TradeSkuOrderBean, TradeSkuOrderBean, String, TimeWindow>() {
                    @Override
                    public void apply(String s, TimeWindow window, Iterable<TradeSkuOrderBean> input, Collector<TradeSkuOrderBean> out) throws Exception {

                        //获取数据
                        TradeSkuOrderBean next = input.iterator().next();

                        //补充信息
                        next.setTs(System.currentTimeMillis());
                        next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));
                        next.setStt(DateFormatUtil.toYmdHms(window.getStart()));

                        //输出数据
                        out.collect(next);
                    }
                });

        reduceDS.print("reduceDS>>>>>>>");

        //TODO 7.关联维表补充维度信息
//        reduceDS.map(new RichMapFunction<TradeSkuOrderBean, TradeSkuOrderBean>() {
//            @Override
//            public void open(Configuration parameters) throws Exception {
//                //获取Phoenix连接池
//            }
//            @Override
//            public TradeSkuOrderBean map(TradeSkuOrderBean value) throws Exception {
//                //读取Phoenix信息,将其补充至Value上  select * from db.dim_xxx_xxx where id = ''
//                return value;
//            }
//        });

        //SKU
        SingleOutputStreamOperator<TradeSkuOrderBean> reduceWithSkuDS = AsyncDataStream.unorderedWait(reduceDS,
                new DimAsyncJoinFunction<TradeSkuOrderBean>("DIM_SKU_INFO") {
                    @Override
                    public String getKey(TradeSkuOrderBean input) {
                        return input.getSkuId();
                    }

                    @Override
                    public void join(TradeSkuOrderBean input, JSONObject dimInfo) {
                        input.setSkuName(dimInfo.getString("SKU_NAME"));
                        input.setSpuId(dimInfo.getString("SPU_ID"));
                        input.setTrademarkId(dimInfo.getString("TM_ID"));
                        input.setCategory3Id(dimInfo.getString("CATEGORY3_ID"));
                    }
                },
                100,
                TimeUnit.SECONDS);

        reduceWithSkuDS.print("reduceWithSkuDS>>>>>>>>>>>");

        //SPU
        SingleOutputStreamOperator<TradeSkuOrderBean> reduceWithSpuDS = AsyncDataStream.unorderedWait(reduceWithSkuDS,
                new DimAsyncJoinFunction<TradeSkuOrderBean>("DIM_SPU_INFO") {
                    @Override
                    public String getKey(TradeSkuOrderBean input) {
                        return input.getSpuId();
                    }

                    @Override
                    public void join(TradeSkuOrderBean input, JSONObject dimInfo) {
                        input.setSpuName(dimInfo.getString("SPU_NAME"));
                    }
                },
                100,
                TimeUnit.SECONDS);

        //TradeMark
        SingleOutputStreamOperator<TradeSkuOrderBean> reduceWithTrademarkDS = AsyncDataStream.unorderedWait(reduceWithSpuDS,
                new DimAsyncJoinFunction<TradeSkuOrderBean>("DIM_BASE_TRADEMARK") {
                    @Override
                    public String getKey(TradeSkuOrderBean input) {
                        return input.getTrademarkId();
                    }

                    @Override
                    public void join(TradeSkuOrderBean input, JSONObject dimInfo) {
                        input.setTrademarkName(dimInfo.getString("TM_NAME"));
                    }
                },
                100,
                TimeUnit.SECONDS);

        //Category3
        SingleOutputStreamOperator<TradeSkuOrderBean> reduceWithCategory3DS = AsyncDataStream.unorderedWait(reduceWithTrademarkDS,
                new DimAsyncJoinFunction<TradeSkuOrderBean>("DIM_BASE_CATEGORY3") {
                    @Override
                    public String getKey(TradeSkuOrderBean input) {
                        return input.getCategory3Id();
                    }

                    @Override
                    public void join(TradeSkuOrderBean input, JSONObject dimInfo) {
                        input.setCategory3Name(dimInfo.getString("NAME"));
                        input.setCategory2Id(dimInfo.getString("CATEGORY2_ID"));
                    }
                },
                100,
                TimeUnit.SECONDS);

        //Category2
        SingleOutputStreamOperator<TradeSkuOrderBean> reduceWithCategory2DS = AsyncDataStream.unorderedWait(reduceWithCategory3DS,
                new DimAsyncJoinFunction<TradeSkuOrderBean>("DIM_BASE_CATEGORY2") {
                    @Override
                    public String getKey(TradeSkuOrderBean input) {
                        return input.getCategory2Id();
                    }

                    @Override
                    public void join(TradeSkuOrderBean input, JSONObject dimInfo) {
                        input.setCategory2Name(dimInfo.getString("NAME"));
                        input.setCategory1Id(dimInfo.getString("CATEGORY1_ID"));
                    }
                },
                100,
                TimeUnit.SECONDS);

        //Category1
        SingleOutputStreamOperator<TradeSkuOrderBean> reduceWithCategory1DS = AsyncDataStream.unorderedWait(reduceWithCategory2DS,
                new DimAsyncJoinFunction<TradeSkuOrderBean>("DIM_BASE_CATEGORY1") {
                    @Override
                    public String getKey(TradeSkuOrderBean input) {
                        return input.getCategory1Id();
                    }

                    @Override
                    public void join(TradeSkuOrderBean input, JSONObject dimInfo) {
                        input.setCategory1Name(dimInfo.getString("NAME"));
                    }
                },
                100,
                TimeUnit.SECONDS);

        reduceWithCategory1DS.print("reduceWithCategory1DS>>>>>>>>>>>>>>");

        //TODO 8.将数据写出到ClickHouse
        reduceWithCategory1DS.addSink(MyClickHouseUtil.getSinkFunction("insert into dws_trade_sku_order_window values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"));

        //TODO 9.启动任务
        env.execute("Dws09TradeSkuOrderWindow");

    }

}
